/*
 * Copyright (c) 2008-2023, Hazelcast, Inc. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.hazelcast.huaweicloud;

import com.hazelcast.cluster.Address;
import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import com.hazelcast.spi.discovery.AbstractDiscoveryStrategy;
import com.hazelcast.spi.discovery.DiscoveryNode;
import com.hazelcast.spi.discovery.DiscoveryStrategy;
import com.hazelcast.spi.discovery.SimpleDiscoveryNode;
import com.hazelcast.spi.exception.RestClientException;
import com.hazelcast.spi.partitiongroup.PartitionGroupMetaData;
import com.hazelcast.spi.utils.PortRange;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Collection;

import static com.hazelcast.huaweicloud.HwProperties.ACCESS_KEY;
import static com.hazelcast.huaweicloud.HwProperties.SECRET_KEY;
import static com.hazelcast.huaweicloud.HwProperties.IAM_DOMAIN;
import static com.hazelcast.huaweicloud.HwProperties.IAM_USER;
import static com.hazelcast.huaweicloud.HwProperties.IAM_PASSWORD;
import static com.hazelcast.huaweicloud.HwProperties.REGION;
import static com.hazelcast.huaweicloud.HwProperties.PORT;
import static com.hazelcast.huaweicloud.HwProperties.INSTANCE_ID;
import static com.hazelcast.huaweicloud.HwProperties.PROJECT_ID;
import static com.hazelcast.huaweicloud.HwProperties.TAG_KEY;
import static com.hazelcast.huaweicloud.HwProperties.TAG_VALUE;
import static com.hazelcast.huaweicloud.HwProperties.INSTANCE_METADATA_AVAILABLE;

/**
 * Ecs implementation of {@link DiscoveryStrategy}
 */

public class HwDiscoveryStrategy extends AbstractDiscoveryStrategy {
    private static final ILogger LOGGER = Logger.getLogger(HwDiscoveryStrategy.class);
    private static final int HTTP_FORBIDDEN = 403;
    private static final String DEFAULT_PORT = "5701-5703";
    private final HwClient hwClient;
    private final PortRange portRange;
    private final Map<String, String> memberMetadata = new HashMap<>();
    private boolean isKnownExceptionAlreadyLogged;
    private boolean isEmptyAddressListAlreadyLogged;

    public HwDiscoveryStrategy(Map<String, Comparable> properties) {
        super(LOGGER, properties);
        try {
            HwMetadataApi hwMetadataApi = new HwMetadataApi();
            HwAuthenticator hwAuthenticator = new HwAuthenticator();
            HwApi hwApi = new HwApi();
            HwConfig hwConfig = createEcsConfig().build();
            this.hwClient = new HwClient(hwApi, hwMetadataApi, hwAuthenticator, hwConfig);
            this.portRange = hwConfig.getPort();
        } catch (IllegalArgumentException e) {
            throw new InvalidConfigurationException("Invalid Ecs Discovery Strategy configuration", e);
        }
    }

    // for test only
    HwDiscoveryStrategy(Map<String, Comparable> properties, HwClient hwClient) {
        super(LOGGER, properties);
        this.hwClient = hwClient;
        this.portRange = createEcsConfig().build().getPort();
    }

    // Use the EcsProperties class to parse the property names in the file,
    // and set the corresponding values to the EcsConfig class to store configuration property values
    private HwConfig.Builder createEcsConfig() {
        try {
            return HwConfig.builder().setAccessKey(getOrNull(ACCESS_KEY)).setSecretKey(getOrNull(SECRET_KEY))
                    .setInstanceId(getOrNull(INSTANCE_ID)).setRegion(getOrNull(REGION))
                    .setPort(new PortRange(getPortRange())).setTagKey(getOrNull(TAG_KEY))
                    .setTagValue(getOrNull(TAG_VALUE)).setIamDomain(getOrNull(IAM_DOMAIN))
                    .setIamUser(getOrNull(IAM_USER)).setIamPassword(getOrNull(IAM_PASSWORD))
                    .setProjectId(getOrNull(PROJECT_ID))
                    .setInstanceMetadataAvailable(getOrDefault(INSTANCE_METADATA_AVAILABLE.getDefinition(), true));
        } catch (IllegalArgumentException e) {
            throw new InvalidConfigurationException("Ecs configuration is not valid", e);
        }
    }

    private String getPortRange() {
        Object rangeConfig = getOrNull(PORT.getDefinition());
        if (rangeConfig == null) {
            return DEFAULT_PORT;
        }
        return rangeConfig.toString();
    }

    private String getOrNull(HwProperties hwProperties) {
        return getOrNull(hwProperties.getDefinition());
    }

    /**
     * Use the ecsClient object to obtain address information, and assign it to the addresses variable,
     * obtain the value corresponding to the key variable in the addresses variable,and assign it to the ip variable.
     * Judge according to the ip variable and assign it to privateAddress variable or publicAddress variable respectively.
     * Create a SimpleDiscoveryNode object, representing a simple network node, and add it to the result variable.
     */
    @Override
    public Collection<DiscoveryNode> discoverNodes() {
        try {
            LOGGER.info("----------------using EcsDiscoveryStrategy----------------");
            // get privateIp and publicIp from EcsClient
            Map<String, String> addresses = hwClient.getAddresses();
            logResult(addresses);
            List<DiscoveryNode> result = new ArrayList<>();

            // create DiscoveryNode for every ip and port,and add them to Collection
            for (int port = portRange.getFromPort(); port <= portRange.getToPort(); port++) {
                Address privateAddress = null;
                Address publicAddress = null;
                for (String key : addresses.keySet()) {
                    String ip = addresses.getOrDefault(key, null);
                    if (("privateIp").equals(key)) {
                        privateAddress = new Address(ip, port);
                    } else {
                        publicAddress = new Address(ip, port);
                    }
                }
                if (privateAddress != null) {
                    result.add(new SimpleDiscoveryNode(privateAddress, publicAddress));
                }
            }
            return result;
        } catch (RestClientException e) {
            if (e.getHttpErrorCode() == HTTP_FORBIDDEN) {
                if (!isKnownExceptionAlreadyLogged) {
                    LOGGER.warning("Required role is not assigned to service principal!");
                    isKnownExceptionAlreadyLogged = true;
                }
                LOGGER.finest(e);
            } else {
                LOGGER.warning("Cannot discover nodes. Starting standalone.", e);
            }
        } catch (Exception e) {
            LOGGER.warning("Cannot discover nodes. Starting standalone.", e);
        }
        return Collections.emptyList();
    }

    private void logResult(Map<String, String> addresses) {
        if (addresses.isEmpty() && !isEmptyAddressListAlreadyLogged) {
            LOGGER.warning("No IP addresses found! Starting standalone.");
            isEmptyAddressListAlreadyLogged = true;
        }

        LOGGER.info(String.format("Found the following (private => public) addresses: %s", addresses));
    }

    @Override
    public Map<String, String> discoverLocalMetadata() {
        if (memberMetadata.isEmpty()) {
            memberMetadata.put(PartitionGroupMetaData.PARTITION_GROUP_ZONE, hwClient.getAvailabilityZone());
        }
        return memberMetadata;
    }
}
